package com.gitee.jastee.kafka.tool;

import cn.hutool.core.lang.Tuple;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.gitee.jastee.kafka.admin.KafkaAdminClients;
import com.gitee.jastee.util.ParameterTool;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.config.ConfigResource;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static com.gitee.jastee.kafka.constant.KafkaConstants.*;

/**
 * @author Jast
 * @description Kafka Admin 工具类
 * @date 2022-03-03 14:01
 */
public class KafkaAdminTool {

    private static final Log log = LogFactory.get();

    /***
     * 删除指定offset之前的数据
     * @name deleteRecordsByOffset
     * @date 2022/03/3 下午2:05
     * @return void
     * @param args
     * @author Jast
     */
    public static void deleteRecordsByOffset(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        String brokerList = parameterTool.get(KAFKA_BOOTSTRAP_SERVERS_CONFIG, "0.0.0.0:9092");
        KafkaAdminClients admin = new KafkaAdminClients(brokerList);

        Map<Integer, Long> partitionAndOffset = new HashMap<>();
        partitionAndOffset.put(
                Integer.parseInt(parameterTool.getRequired(DELETE_OFFSET_PARTITION)),
                Long.parseLong(parameterTool.getRequired(DELETE_OFFSET)));
        admin.deleteRecordsByOffset(parameterTool.getRequired(KAFKA_TOPIC), partitionAndOffset);
    }

    /**
     * 创建Topic
     *
     * @name createTopic
     * @date 2022/03/3 下午3:52
     * @return void
     * @param args
     * @author Jast
     */
    public static void createTopic(String[] args) throws ExecutionException, InterruptedException {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String brokerList = parameterTool.get(KAFKA_BOOTSTRAP_SERVERS_CONFIG, "0.0.0.0:9092");
        String topicName = parameterTool.getRequired(KAFKA_TOPIC);
        short partitionNum = parameterTool.getShort(KAFKA_NUM_PARTITIONS, (short) 3);
        short replicationFactor = parameterTool.getShort(KAFKA_NUM_REPLICATION_FACTOR, (short) 1);
        long retentionMs = parameterTool.getLong(KAFKA_RETENTION_MS, -1);
        KafkaAdminClients admin = new KafkaAdminClients(brokerList);
        if (retentionMs == -1) {
            admin.createTopic(topicName, partitionNum, replicationFactor);
            log.info(
                    "创建Topic:{},分区数量:{},副本数量:{},数据有效期:默认",
                    topicName,
                    partitionNum,
                    replicationFactor);
        } else {
            admin.createTopic(topicName, partitionNum, replicationFactor, retentionMs);
            log.info(
                    "创建Topic:{},分区数量:{},副本数量:{},数据有效期:{}",
                    topicName,
                    partitionNum,
                    replicationFactor,
                    retentionMs);
        }
    }

    /**
     * 查看Topic详细信息
     *
     * @name describeTopic
     * @date 2022/03/3 下午4:07
     * @return cn.hutool.core.lang.Tuple
     * @param args
     * @author Jast
     */
    public static Tuple describeTopic(String[] args)
            throws ExecutionException, InterruptedException {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String topicName = parameterTool.getRequired(KAFKA_TOPIC);

        String brokerList = parameterTool.get(KAFKA_BOOTSTRAP_SERVERS_CONFIG, "0.0.0.0:9092");
        KafkaAdminClients admin = new KafkaAdminClients(brokerList);
        Map<String, TopicDescription> stringTopicDescriptionMap = admin.describeTopic(topicName);
        Map<ConfigResource, Config> configResourceConfigMap = admin.describeConfigTopic(topicName);
        return new Tuple(stringTopicDescriptionMap, configResourceConfigMap);
    }

    public static void deleteTopic(String[] args) throws ExecutionException, InterruptedException {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String topicName = parameterTool.getRequired(KAFKA_TOPIC);
        String brokerList = parameterTool.get(KAFKA_BOOTSTRAP_SERVERS_CONFIG, "0.0.0.0:9092");
        KafkaAdminClients admin = new KafkaAdminClients(brokerList);
        admin.deleteTopic(topicName);
    }
}
